package base_spark

import java.io.StringReader

import com.opencsv.CSVReader
import org.apache.spark.{SparkConf, SparkContext}

class SecOrder(val first: Int, val sec: Int) extends Ordered[SecOrder] with Serializable {

    override def compare(that: SecOrder): Int = {
        if (this.first != that.first) {
            return this.first - that.first
        } else {
            return this.sec - that.sec
        }
    }

    override def toString: String = {
        String.format("%s[%s,%s]", this.getClass.getName, this.first.toString, this.sec.toString)
    }
}

class TopN {
    // 前五个工资最高
    def topPayment(path: String): Unit = {
        var number = 0
        TopN.sc.textFile(path)
            .map(_.split(",")(2).trim.toFloat)
            .sortBy(item => item, false)
            .take(5)
            .foreach(item => {
                number += 1
                println(String.format("第%s名 payment:%s", number.toString, item.toString))
            })

    }

    // txt 文件夹下面的payment最大值和最小值
    def getPaymentMax(path: String): Unit = {
        TopN.sc.textFile(path)
            .map(_.split(","))
            .filter(line => line.length == 4 && line(2) != " ")
            .map(item => (" ", item(2).toFloat))
            .groupByKey()
            .mapValues(item => (item.max, item.min))
            .values
            .collect()
            .foreach(println)
    }

    // 对order.txt实行二次排序
    def orderIt(path: String): Unit = {
        TopN.sc.textFile(path)
            .filter(_.split(" ").length == 2)
            .map(line => (new SecOrder(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line))
            .sortByKey(false)
            .values
            .collect()
            .foreach(println)
    }

    //平均分超过4分的电影

    def movie(ratingPath: String, moviePath: String): Unit = {
        val ratingRdd = TopN.sc.textFile(ratingPath)
            .map(line => {
                var csvReader = new CSVReader(new StringReader(line))
                val temp = csvReader.readNext().toList
                csvReader.close()
                temp
            })
            .filter(item => item.contains("userId") == false)
            .map(item => (item(1), item(2).toFloat))
            .groupByKey()
            .mapValues(item => item.sum / item.size.toFloat)
            .filter(_._2 > 4)
            .sortBy(item => item._2, false)
        val movieRdd = TopN.sc.textFile(moviePath)
            .map(line => {
                var csvReader = new CSVReader(new StringReader(line))
                val temp = csvReader.readNext().toList
                csvReader.close()
                temp
            })
            .filter(item => item.contains("movieId") == false)
            .map(item => (item(0), item(1)))
        // 打印所有电影评分和标题
        ratingRdd.leftOuterJoin(movieRdd)
            .values
            .sortByKey(false)
            .foreach(println)


    }


}


object TopN {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local[*]")
        .setAppName("计算平均值")
    val sc = new SparkContext(conf)

    def main(args: Array[String]): Unit = {
        //        new TopN().topPayment(base_spark.util.Utils.getResourcePath("/txt/file1.txt"))
        //        new TopN().getPaymentMax(base_spark.util.Utils.getResourcePath("/txt"))
        //        new TopN().orderIt(base_spark.util.Utils.getResourcePath("/txt/order.txt"))
        new TopN().movie(
            base_spark.util.Utils.getResourcePath("/movie_1m/ratings.csv"),
            base_spark.util.Utils.getResourcePath("/movie_1m/movies.csv")
        )


    }
}